Source code for hysop.backend.device.opencl.opencl_env

# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import hashlib, os, copy, re
import numpy as np

from hysop import vprint, dprint
from hysop import (
    __VERBOSE__,
    __KERNEL_DEBUG__,
    __DEFAULT_PLATFORM_ID__,
    __DEFAULT_DEVICE_ID__,
)
from hysop.constants import Precision, DeviceType
from hysop.tools.htypes import check_instance, first_not_None
from hysop.tools.io_utils import IO
from hysop.tools.units import bytes2str
from hysop.tools.warning import HysopWarning
from hysop.tools.string_utils import framed_str
from hysop.core.mpi import main_rank

from hysop.backend.device.opencl import (
    cl,
    clTools,
    __OPENCL_PROFILE__,
    OPENCL_KERNEL_DUMP_FOLDER,
)
from hysop.backend.device.opencl.opencl_tools import (
    convert_device_type,
    convert_precision,
)
from hysop.backend.device.opencl.opencl_tools import (
    get_platform,
    get_context,
    get_device,
    create_queue,
    parse_opencl_file,
)
from hysop.backend.device.opencl.opencl_allocator import OpenClImmediateAllocator
from hysop.tools.handle import TaggedObject


[docs] class OpenClEnvironment(TaggedObject): """ OpenCL environment. """ def __new__( cls, mpi_params, platform_id=None, device_id=None, device_type=None, gl_sharing=False, strict=True, name=None, **kwds, ): return super().__new__(cls, tag_prefix="clenv", **kwds) def __init__( self, mpi_params, platform_id=None, device_id=None, device_type=None, gl_sharing=False, strict=True, name=None, **kwds, ): """Initialize an OpenCL environment Parameters ---------- platform_id : int chosen platform id. device_id : int chosen device id. device_type : string chosen device type. gl_sharing : bool, optional True to build a context shared between OpenGL and OpenCL. Default=False. mpi_params: mpi parameters, optional Communicator which handles the OpenCL env. strict: bool, optional Use strict device and platform checks. Try to match exactly given platform and device IDs. Also try to match given device type. Else raise an error. If this is set to False, try to fallback to a working compute device (possibly on a different platform and of a different device type). name : str, optional Name used for memory pool logging. Defaults to device name. kwds: dict Extra arguments for memory pool creation. Notes ----- See hysop.backend.device.opencl.opencl_tools.get_or_create_opencl_env() to create an OpenClEnvironment that will persist and thus maximize memory pool memory reuse on target device. """ super().__init__(tag_prefix="clenv", **kwds) platform_id = first_not_None(platform_id, __DEFAULT_PLATFORM_ID__) device_id = first_not_None(device_id, __DEFAULT_DEVICE_ID__) device_type = first_not_None(device_type, DeviceType.ALL) msg = """ name: {} platform_id: {} device_id: {} device_type: {} gl_sharing: {} comm size: {}""".format( name, platform_id, device_id, device_type, gl_sharing, mpi_params.size ) device_type = convert_device_type(device_type) try: # OpenCL platform platform = get_platform(platform_id, strict=strict) # OpenCL device device = get_device(platform, device_id, device_type, strict=strict) except: title = " while creating the following OpenCL environment " msg = framed_str(title=title, msg=msg) print(msg + "\n") raise # OpenCL context context = get_context(device, gl_sharing) # OpenCL default queue self._queues = {"default_queue": create_queue(context)} self._default_queue_enabled = True queue = self.default_queue # OpenCL allocator allocator = OpenClImmediateAllocator( queue=queue, mem_flags=cl.mem_flags.READ_WRITE ) # OpenCL memory pool if name is None: name = device.name pos = name.find("@") if pos > 0: name = name[:pos] name = name.strip() self._platform = platform self._device = device self._context = context self._allocator = allocator self._cl_version = self._parse_opencl_version() try: device_type_str = cl.device_type.to_string(device.type) except ValueError: device_type_str = f"UNKNOWN DEVICE TYPE {device.type}" self.device_type_str = device_type_str msg += """ -- Platform -- *plat id: {} *name: {} *version: {} -- Device -- *dev id: {} *name: {} *type: {} *version: {} *global mem size: {}""".format( platform_id, platform.name.strip(), platform.version, device_id, device.name.strip(), device_type_str, device.opencl_c_version, bytes2str(device.global_mem_size), ) if context.properties: msg += "\n" msg += "\n -- Context --" msg += f"\n *properties: {context.properties}" if queue.properties: msg += "\n" msg += "\n -- Queue --" msg += f"\n *properties: {queue.properties}" title = f" Creating OpenCL environment {self.tag} " msg = framed_str(title=title, msg=msg) vprint(msg) # Floating point codegeneration mode _kargs = {"device": device, "context": context, "platform": platform} if __KERNEL_DEBUG__: _kargs["float_dump_mode"] = "dec" else: _kargs["float_dump_mode"] = "hex" self.default_build_opts = [] if __OPENCL_PROFILE__ and self.device.vendor.find("NVIDIA") >= 0: self.default_build_opts.append("-cl-nv-verbose") self.macros = {} self._mpi_params = mpi_params self.is_master = mpi_params.rank == 0 self.is_multi_device = mpi_params.size > 1 self.platform_id = platform_id self.device_id = device_id self.name = name self._check_comm_devices()
[docs] def build_typegen( self, precision, float_dump_mode, use_short_circuit_ops, unroll_loops ): from hysop.constants import Precision from hysop.backend.device.opencl.opencl_types import OpenClTypeGen if precision not in (np.float16, np.float32, np.float64): check_instance(precision, Precision) if precision in [Precision.LONG_DOUBLE, Precision.QUAD, Precision.SAME]: msg = "Precision {} is not supported for OpenCl environment." msg = msg.format(precision) raise ValueError(msg) precision = convert_precision(precision) if precision == np.float16: fbtype = "half" elif precision == np.float32: fbtype = "float" elif precision == np.float64: fbtype = "double" else: raise ValueError(f"Unknown floating point precision {precision}!") return OpenClTypeGen( device=self.device, platform=self.platform, context=self.context, fbtype=fbtype, float_dump_mode=float_dump_mode, use_short_circuit_ops=use_short_circuit_ops, unroll_loops=unroll_loops, )
def __eq__(self, other): return self is other def __ne__(self, other): return self is not other def __hash__(self): return id(self)
[docs] def extensions(self): return [ ext.strip() for ext in self._device.extensions.split(" ") if ext.strip() != "" ]
[docs] def has_extension(self, extension): return extension in self.extensions()
[docs] def pci_bus_id(self): """ Return the PCI bus id of this device if possible. Format is '0000:bus:device.function' 8+5+3 = 16 bits Example: 0000:01:00.0 """ if self.has_extension("cl_nv_device_attribute_query"): bus_id = self.device.pci_bus_id_nv slot_id = self.device.pci_slot_id_nv dev_id = slot_id >> 3 fn_id = slot_id & 0x07 bus_id0 = bus_id >> 8 bus_id1 = bus_id & 0xFF return f"{bus_id0:04x}:{bus_id1:02x}:{dev_id:02x}.{fn_id:01x}" elif self.has_extension("cl_amd_device_topology"): topo = self.device.topology_amd bus_id = topo.pcie.bus dev_id = topo.pcie.device fn_id = topo.pcie.function bus_id0 = bus_id >> 8 # not sure if usefull bus_id1 = bus_id & 0xFF return f"{bus_id0:04x}:{bus_id1:02x}:{dev_id:02x}.{fn_id:01x}" else: return f"unknown({self.platform_id},{self.device_id})"
[docs] def device_identifier(self): from hysop.core.mpi import processor_name processor_name = processor_name.lower().strip().replace(" ", "_") device = self.device.name.lower().strip().replace(" ", "_") platform = self.platform.name.lower().strip().replace(" ", "_") pci_bus_id = self.pci_bus_id() identifier = f"{processor_name}__{platform}__{device}__{pci_bus_id}" return identifier
def _check_comm_devices(self): identifier = self.device_identifier() comm = self.mpi_params.comm devices = comm.gather(sendobj=(self.mpi_params.rank, identifier), root=0) if self.is_master: device_identifiers = tuple(dev[1] for dev in devices) good = len(device_identifiers) == len(set(device_identifiers)) formatted_devices = (f"rank {rank}: {dev_id}" for (rank, dev_id) in devices) msg = "\n" + "*" * 82 msg += "\nOPENCL WARNING: The same OpenCL compute device will be used by multiple processes:" msg += "\n *" + "\n *".join(formatted_devices) msg += "\n This may drastically reduce hardware performances." msg += "\n" + "*" * 82 else: good, msg = None, None (good, msg) = comm.bcast(obj=(good, msg), root=0) if not good: import warnings warnings.warn(msg, HysopWarning)
[docs] def enable_default_queue(self): self._default_queue_enabled = True
[docs] def disable_default_queue(self): self._default_queue_enabled = False
[docs] def get_platform(self): return self._platform
[docs] def get_context(self): return self._context
[docs] def get_device(self): return self._device
[docs] def get_queues(self): return self._queues
[docs] def get_allocator(self): return self._allocator
# def get_memory_pool(self): # return self._mempool
[docs] def get_default_queue(self): if not self._default_queue_enabled: msg = "Default queue has been disabled." raise RuntimeError(msg) return self.queue("default_queue")
[docs] def get_mpi_params(self): return self._mpi_params
[docs] def get_cl_version(self): return self._cl_version
def _parse_opencl_version(self): assert self.device is not None sversion = self.device.version.strip() _regexp = r"OpenCL\s+(\d)\.(\d)" regexp = re.compile(_regexp) match = re.match(regexp, sversion) if not match: msg = "Could not extract OpenCL version from device returned version '{}' " msg += "and regular expression '{}'." msg = msg.format(sversion, _regexp) raise RuntimeError(msg) major = match.group(1) minor = match.group(2) return (major, minor) platform = property(get_platform) context = property(get_context) device = property(get_device) queues = property(get_queues) allocator = property(get_allocator) cl_version = property(get_cl_version) # memory_pool = property(get_memory_pool) default_queue = property(get_default_queue) mpi_params = property(get_mpi_params)
[docs] def queue(self, name): return self._queues[name]
[docs] def create_queue(self, name): """Create OpenCL queue from current context""" assert name not in self._queues queue = create_queue(self.context()) self._queues[name] = queue return queue
def _create_cl_program( self, file_list, vector_width=4, nb_remesh_components=1, build_options="", force_verbose=None, force_debug=None, ): """Build OpenCL sources Parameters ---------- files : string or list of strings user defined files names vector_width : int, optional OpenCL vector type width, default=4 nb_remesh_components : int, optional number of remeshed components, default=1 build_options: string additional OpenCL compile flags force_verbose: bool, optional, default=None force verbose mode force_debug: bool, optional, default=None force debug mode (kernel source dumping and preprocessing) Returns OpenCL kernel Parse the sources to handle single and double precision. """ VERBOSE = __VERBOSE__ if (force_verbose is None) else force_verbose DEBUG = __KERNEL_DEBUG__ if (force_debug is None) else force_debug gpu_src = "" if ( cl.device_type.to_string(self.device.type) == "GPU" and self.precision is DOUBLE_GPU ): gpu_src += "#pragma OPENCL EXTENSION cl_khr_fp64: enable \n" if isinstance(files, list): file_list = files else: file_list = [files] if VERBOSE: print("=== Kernel sources compiling ===") for sd in file_list: print(" - ", sf) for sf in file_list: # search and open cl file. try: f = open(sf) except OSError as ioe: if ioe.errno == 2: # path to cl files inside hysop.gpu package f = open(OPENCL_SRC + sf) else: raise ioe gpu_src += "".join(self.parse_file(f, vector_width, nb_remesh_components)) f.close() # output gpu_src if self.macros is not None: for k in self.macros: gpu_src = gpu_src.replace(k, str(self.macros[k])) if self.precision is FLOAT_GPU: # Rexexp to add 'f' suffix to float constants # Match 1.2, 1.234, 1.2e3, 1.2E-05 float_replace = re.compile(r"(?P<float>\d\.\d+((e|E)-?\d+)?)") gpu_src = float_replace.sub(r"\g<float>f", gpu_src) else: gpu_src = gpu_src.replace("float", "double") # Log final opencl generated code for debug purposes if DEBUG: kernel_name = (file_list[-1].split("/")[-1]).replace(".cl", "_parsed") def listformat(L): if isinstance(L, str): L = L.replace("-D ", "").split(" ") L = list(L) for empty in ["", " "]: if empty in L: L.remove(empty) return "\n\t\t" + "\n\t\t".join(L) dump_prefix = """ /* Dumped OpenCL Kernel '{}' vector_width: {} nb_remesh_components: {} source_files: {} default_build_opts: {} all build_options: {} */ """.format( kernel_name, vector_width, nb_remesh_components, listformat(file_list), listformat(self.default_build_opts), listformat(build_options), ) dumped_src = dump_prefix + gpu_src dump_folder = os.path.join(IO.default_path(), OPENCL_KERNEL_DUMP_FOLDER) dump_file_prefix = os.path.join( dump_folder, f"rk{main_rank}_" + kernel_name ) tmp_dump_file = dump_file_prefix + ".c" dump_file = dump_file_prefix + ".cl" if not os.path.exists(dump_folder) and (main_rank == 0): os.makedirs(dump_folder) with open(tmp_dump_file, "w+") as f: f.write(dumped_src) try: # try to preprocess sources import subprocess opts = build_options opts = re.sub("-cl-([a-z0-9]+-?)+ ", "", opts) cmd = [ "gcc", opts, "-E", "-c", tmp_dump_file, "-o", dump_file_prefix + "_preprocessed.cl", ] subprocess.check_call(" ".join(cmd), shell=True) finally: os.rename(tmp_dump_file, dump_file) if VERBOSE: msg = f"OpenCL kernel {kernel_name} source dumped to {dump_file}." print(msg) # OpenCL program prg = cl.Program(self.context, gpu_src) return prg
[docs] def build_src( self, files, build_options="", vector_width=4, nb_remesh_components=1 ): """Build OpenCL sources Parameters ---------- files : string or list of strings user defined file names build_options : string, optional Compiler options, default="" vector_width : int, optional OpenCL vector type width, default=4 nb_remesh_components : int, optional number of remeshed components, default=1 force_verbose: bool, optional force verbose mode force_debug: bool, optional force debug mode (kernel dumping) Returns OpenCL binaries Parse the sources to handle single and double precision. """ if isinstance(files, list): file_list = files else: file_list = [files] vprint("=== Kernel sources compiling ===") for sf in file_list: vprint(" - ", sf) # --- create kernel from cl files --- prg = self._create_cl_program( files=file_list, build_options=build_options, vector_width=vector_width, nb_remesh_components=nb_remesh_components, force_verbose=force_verbose, force_debug=force_debug, ) # --- Build kernel --- try: build = prg.build(build_options) except Exception as e: print("Build files : ") for sf in file_list: print(" - ", sf) print("Build options : ", build_options) print("Vectorization : ", vector_width) raise e # display post-build info vprint( "Build options : ", build.get_build_info(self.device, cl.program_build_info.OPTIONS), ) vprint( "Compiler status : ", build.get_build_info(self.device, cl.program_build_info.STATUS), ) vprint( "Compiler log : ", build.get_build_info(self.device, cl.program_build_info.LOG), ) vprint("===\n") return build
[docs] def build_raw_src( self, src, build_options=[], kernel_name=None, force_verbose=None, force_debug=None, ): """Build raw OpenCL sources Parameters ---------- src : string OpenCL source code build_options : string Compiler options to use for building Returns OpenCL binaries """ assert isinstance(build_options, (list, set, tuple)) build_opts = self.default_build_opts + list(build_options) VERBOSE = False if (force_verbose is None) else force_verbose DEBUG = False if (force_debug is None) else force_debug gpu_src = src src_hash = hashlib.sha1(gpu_src.encode("utf-8")).hexdigest() if kernel_name is None: kernel_name = src_hash else: kernel_name += f"_{src_hash[:4]}" if VERBOSE: print("=== Kernel raw source compiling ===") prg = cl.Program(self.context, gpu_src) dump_folder = os.path.join(IO.default_path(), OPENCL_KERNEL_DUMP_FOLDER) if DEBUG: # dump kernel source while in debug mode if not os.path.exists(dump_folder) and (main_rank == 0): os.makedirs(dump_folder) dump_file = os.path.join( dump_folder, f"rk{main_rank}_{kernel_name}_dump.cl" ) print(f"Dumping kernel src at '{dump_file}'.") with open(dump_file, "w+") as f: f.write(gpu_src) s_build_opts = " ".join(build_opts) if VERBOSE: print(f"Build options: {s_build_opts}") print("Building...") # Build OpenCL program try: build = prg.build(s_build_opts) except Exception as e: # always dump source when build fails if not os.path.exists(dump_folder) and (main_rank == 0): os.makedirs(dump_folder) dump_file = os.path.join( dump_folder, f"rk{main_rank}_{kernel_name}_build_fail.cl" ) with open(dump_file, "w+") as f: f.write(gpu_src) print("Build options : ", s_build_opts) print(f"Build Failed: dumped source to {dump_file}.") raise e if VERBOSE: print( "Compiler status: {}".format( build.get_build_info(self.device, cl.program_build_info.STATUS) ) ) print( "Compiler log: {}".format( build.get_build_info(self.device, cl.program_build_info.LOG) ) ) return build
def __str__(self): platform, context, device, queue = ( self.platform, self.context, self.device, self.default_queue, ) msg = """ -- Platform -- *name: {} *version: {} -- Device -- *name: {} *type: {} *version: {} *global mem size: {} """.format( platform.name, platform.version, device.name, self.device_type_str, device.opencl_c_version, bytes2str(device.global_mem_size), ) if context.properties: msg += "\n -- Context --" msg += f"\n *properties: {context.properties}" msg += "\n" if queue.properties: msg += "\n -- Queue --" msg += f"\n *properties: {queue.properties}" msg += "\n" return msg